#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"

#include "volume_ctl.h"
#include "volume_ctl_internal.h"

/**
 * @file Flat Task
 *
 * 克隆卷flat后，不再依赖于其源快照。对每一个卷上没有的chunk，尝试通过快照去读取内容:
 * - 能读取到数据(生成相应的chunk，包括allocate和write）
 * - 不能读取到数据(精简配置：跳过；厚配置：allocate）
 *
 * flat过程分两阶段：
 * - 生成flat任务
 * - 后台处理flat任务（由admin负责派遣到所在controller）
 *
 * @note 因为一个subvol上的所有元数据更新是一个串行过程，为提高性能，需合理安排chunk的遍历顺序（类似于RAID条带化)
 * @note 批处理有难度
 *
 * @see admin任务调度：bh_task.c
 * @see 读快照：volume_proto_snapshot.c
 *
 * @todo admin感知不到EREMCHG错误码，无法更新本地缓存(@see md_map_getsrv)
 */


static int __volume_ctl_snapshot_flat(va_list ap)
{
        int retry = 0;
        const volid_t *volid = va_arg(ap, volid_t *);
        const int idx = va_arg(ap, int);
        const int force = va_arg(ap, int);
        va_end(ap);

        (void)idx;

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        char pool[MAX_NAME_LEN];
        chkid_t chkid;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        // TODO 转换为后台任务, @see volume_bh.c
        volume_proto = cent->value;

        // copy出volume_proto属性副本，然后解锁
        strcpy(pool, volume_proto->table1.pool);
        chkid = volume_proto->table1.chkid;

        ret = volume_proto->snapshot_flat(volume_proto, 1);
        if (unlikely(ret)) {
                if ((ret == EBUSY || ret == EPERM) && force) {
                        DWARN("force flat\n");
                        //强制跳过
                } else {
                        GOTO(err_lock, ret);
                }
        }

        __volume_ctl_unlock(cent);

        __volume_ctl_release(cent);

        // 持久化后台任务, @see bh_task.c
retry:
        ret = bh_task_create(pool, FLAT_ROOT, &chkid);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, gloconf.rpc_timeout * 2, (1000 * 1000));
                } else if (ret == EEXIST) {
                        if (retry || force) {
                                DINFO("retry success, retry %u force %u\n", retry, force);
                        } else {
                                ret = EINVAL;
                                GOTO(err_ret, ret);
                        }
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

/**
 * @brief flat cloned volume
 *
 * @param volid
 * @param idx
 * @return
 */
int volume_ctl_snapshot_flat(const volid_t *volid, int idx, int force)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_flat", __volume_ctl_snapshot_flat,
                           volid, idx, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void __flat_bh1_chunk(void *_arg)
{
        int ret = 0, deleting;
        chkid_t chkid;
        snapshot_flat_arg_t *arg = _arg;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);

        cent = arg->cent;
        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;

        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        fid2cid(&chkid, &volume_proto->chkid, arg->chunk_idx);
        volume_proto->flat_ctx.stat.chknum++;

        ret = volume_proto->snapshot_flat_bh1(volume_proto, arg->chunk_idx, arg->thin);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        DBUG("chunk %s chknum %ju clone %ju snap_read %ju vol_write %ju %ju %ju\n",
             id2str(&chkid),
             volume_proto->flat_ctx.stat.chknum,
             volume_proto->flat_ctx.stat.clone,
             volume_proto->flat_ctx.stat.snap_read,
             volume_proto->flat_ctx.stat.vol_write,
             volume_proto->flat_ctx.stat.vol_create,
             volume_proto->flat_ctx.stat.vol_createwith);

        __volume_ctl_unlock(cent);

        ANALYSIS_QUEUE(0, 100 * 1000, "__flat_bh1_chunk");

#if ENABLE_SNAPSHOT_ROLLBACK_MULTITASK
        (*arg->task_count)--;
        co_cond_broadcast(arg->cond, 0);
#endif
        yfree((void **)&arg);
        return;

err_lock:
        __volume_ctl_unlock(cent);
err_ret:
#if ENABLE_SNAPSHOT_ROLLBACK_MULTITASK
        (*arg->task_count)--;
        co_cond_broadcast(arg->cond, ret);
#endif
        yfree((void **)&arg);
}

static int __volume_ctl_flat_bh1(mcache_entry_t *cent, uint32_t chknum)
{
        uint32_t i;
        int ret, err_count = 0, thin;
        snapshot_flat_arg_t *arg;
        int task_count = 0;
        generator_t gen;
        co_cond_t cond;
        volume_proto_t *volume_proto;
        char value[MAX_INFO_LEN];
        int valuelen = MAX_INFO_LEN;

        memset(value, 0x00, MAX_INFO_LEN);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO use fileinfo
        volume_proto = cent->value;
        ret = volume_proto->xattr_get(volume_proto, LICH_SYSTEM_ATTR_THIN, value, &valuelen);
        if (unlikely(ret)) {
                if (ret != ENOKEY) {
                        __volume_ctl_unlock(cent);
                        GOTO(err_ret, ret);
                }
        }

        __volume_ctl_unlock(cent);

        /*
         * value == "thick" --> thin = 0
         * value == "thin"  --> thin = 1
         * value == NULL    --> thin = 1 (default)
         */
        thin = !(strcmp(value, "thick") == 0);

        co_cond_init(&cond);

        /**
         * 性能优化： 并行
         * 在16000个chunk内, 元数据的更新是串行化的
         */
        chkid_generator_init(&gen, chknum, FILE_PROTO_EXTERN_ITEM_COUNT);

        //for (i = 0; i < chknum; i++) {
        while(!chkid_generator(&gen, &i)) {
                while (task_count > SNAPSHOT_FLAT_MULTITASK) {
                        ret = co_cond_wait2(&cond, "snapshot_flat");
                        if (unlikely(ret)) {
                                if (ret == ECANCELED) {
                                        DWARN("i %d cancelled\n", i);
                                        goto err_canceled;
                                } else {
                                        if (!thin)
                                                err_count++;
                                        else
                                                goto err_canceled;
                                        DWARN("i %d chknum %d ret %d\n", i, chknum, ret);
                                }
                        }
                }

                ret = ymalloc((void **)&arg, sizeof(snapshot_flat_arg_t));
                if (unlikely(ret)) {
                        GOTO(err_canceled, ret);
                }

                arg->cent = cent;
                arg->chunk_idx = i;
                arg->task_count = &task_count;
                arg->cond = &cond;
                arg->thin = thin;

                task_count++;
                schedule_task_new("snapshot_flat", __flat_bh1_chunk, arg, -1);
        }

        // TODO if error task
        while (task_count > 0) {
                ret = co_cond_wait2(&cond, "snapshot_flat");
                if (unlikely(ret)) {
                        DWARN("ret %d\n", ret);
                        err_count++;
                }
        }

        if (err_count) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;

err_canceled:
        while (task_count > 0) {
                co_cond_wait2(&cond, "snapshot_flat");
        }
err_ret:
        return ret;
}

const int __volume_ctl_flat_bh2(mcache_entry_t *cent)
{
        int ret;
        volume_proto_t *volume_proto;
        table1_t *table1;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        volume_proto = cent->value;
        table1 = &volume_proto->table1;

        // 需要放在清除source属性之前
        // clone依赖于source，vol info也依赖于source
        // 清理顺序：项 < 被依赖项 (先于）

        // flat done, set clone/flat flag off
        ret = table1->snapshot_flat(table1, 0);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        // 如果此处发生故障，重试flat操作的话，会返回EPERM

        // remove xattr source
        // TODO table1 lock
        // TODO 不能放在snapshot_flat里，会造成deadlock
        ret = table1->xattr_remove(table1, LICH_SYSTEM_ATTR_SOURCE);
        if (unlikely(ret)) {
                DWARN("ret %d\n", ret);
                // GOTO(err_lock, ret);
        }

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_flat_bh(const volid_t *volid)
{
        int ret, need_flat;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        char pool[MAX_NAME_LEN];
        uint32_t chknum;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("get "CHKID_FORMAT" %p\n", CHKID_ARG(volid), cent);

        {
                // NOTE 访问volume_proto，需要在锁保护之下
                ret = __volume_ctl_rdlock(cent);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                volume_proto = cent->value;

                strcpy(pool, volume_proto->table1.pool);
                need_flat = (volume_proto->table1.fileinfo.attr & __FILE_ATTR_FLAT__);
                chknum = size2chknum(volume_proto->table1.fileinfo.size, &volume_proto->table1.fileinfo.ec);

                __volume_ctl_unlock(cent);
        }

        if (need_flat) {
                ret = __volume_ctl_flat_bh1(cent, chknum);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                ret = __volume_ctl_flat_bh2(cent);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        }

        // 删除任务
        ret = bh_task_remove(pool, FLAT_ROOT, volid);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}
